package xiaoa.java.jms.rabbitMq;

import java.io.File;
import java.util.Arrays;
import java.util.Date;


import java.util.HashMap;
import java.util.Map;

import org.apache.commons.codec.binary.Base64;
import org.apache.commons.io.FileUtils;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

import xiaoa.java.log.L;
import xiaoa.java.utils.time.DateFormatUtils;

/**
 * 队列父类
 * @author xiaoa
 * @date 2017年10月21日 上午10:38:56
 * @version V1.0
 *
 */
public class QueueBase {
	
	// 通道
	Channel channel = null;
	
	// 队列名字
	String queueName = null;
	
	// 路由器名字
	String exchange;
	
	// 路径
	String path;
	
	
	
	
	
	public QueueBase(String exchange,String path , String queueName) throws Throwable{
		
		init(exchange, path, queueName);

	}
	
	
	public QueueBase() throws Throwable{

		init();
	}
	
	/**
	 * 初始化
	 * @Title: init
	 * @throws Throwable
	 * @author xiaoa
	 */
	public void init() throws Throwable{
		
		if (channel == null){
			  Connection connection = Factory.getFactory().newConnection();  
		      channel = connection.createChannel();  
		      
		      L.info("初始化连接成功  ：" + channel);
		}
		
	} 
	
	/**
	 * 初始化
	 * @Title: init
	 * @param exchange
	 * @param path
	 * @param queueName
	 * @throws Throwable
	 * @author xiaoa
	 */
	public void init(String exchange,String path , String queueName) throws Throwable{
		
		  init();
	      
	      bindQueue(exchange, path, queueName);
	      this.exchange  = exchange;
	      this.path      = path;
	      this.queueName = queueName;
	      
	      L.info("初始化队列成功  ：exchange = " + exchange + " path = " + path + " queueName = " + queueName );
		
	} 
	
	
	/**
	 * 绑定queue
	 * @Title: bindQueue
	 * @param exchange
	 * @param path
	 * @param queueName
	 * @author xiaoa
	 */
	 void bindQueue(String exchange,String path , String queueName)throws Throwable{
		
		  // 设置路由器
	      channel.exchangeDeclare(exchange, "topic", true);
	      
	      // 声明队列  
	      channel.queueDeclare(queueName, true, false, false, null);  
	      
	      // 队列绑定到路由器上
	      channel.queueBind(queueName, exchange, path );
	      
	      // 设置均匀分配
	      channel.basicQos(1);
		
	}
	 
	/**
	 * 重新绑定
	 * @Title: reBindQueue
	 * @param exchange
	 * @param path
	 * @param queueName
	 * @throws Throwable
	 * @author xiaoa
	 */
	void reBindQueue(String exchange,String path , String queueName)throws Throwable{
			
		  // 设置路由器
	      channel.exchangeDeclare(exchange, "topic", true);
	      
	      // 声明队列  
	      channel.queueDeclare(queueName, true, false, false, null);  
	      
	      // 队列绑定到路由器上
	      channel.queueBind(queueName, exchange, path );
	      
	      // 设置均匀分配
	      channel.basicQos(1);
		
	}
	
	
    /**
     * 发送信息
     * @Title: doSendMessage
     * @param exchange
     * @param routingKey
     * @param obj
     * @throws Throwable
     * @author xiaoa
     */
	public void doSendMessage(String exchange,String routingKey,Object obj)throws Throwable{
		doSendMessage(exchange, routingKey, obj,null);
	}

	
	/**
	 * 发送信息
	 * @Title: doSendMessage
	 * @param obj
	 * @author xiaoa
	 */
	public void doSendMessage(String exchange,String routingKey,Object obj , Map<String, Object> headers)throws Throwable{
		
		if (obj == null){
			throw new RuntimeException("obj is null") ;
		}
		
		// 转为json
		String json = JSON.toJSONString(obj);
		
		byte[] bs = json.getBytes();
		
		Map<String, Object> newHeaders = new HashMap<>(); 
		newHeaders.put("time", DateFormatUtils.format_yyyyMMddhhssSSS(new Date()));
		
		if (headers != null && headers.size() > 0){
			
			for (String key : headers.keySet()){
				
				Object value = headers.get(key);
				
				if (value != null){
					
					newHeaders.put(key, value);
				}
				
			}
			
		}
 		
		// 设置请求头
		AMQP.BasicProperties prop = new AMQP.BasicProperties("text/plain", null, newHeaders, null, null, null, null, null, null, null, null, null, null, null);
		
		// 发送消息
		doSendMessage(exchange, routingKey, bs, prop);
		
				
	}
	
	
	/**
	 * 发送
	 * @Title: doSendMessage
	 * @param exchange
	 * @param routingKey
	 * @param bs
	 * @throws Throwable
	 * @author xiaoa
	 */
	public void doSendMessage(String exchange,String routingKey,byte[] bs , AMQP.BasicProperties prop  )throws Throwable{
		long  startTime = System.currentTimeMillis();

		String errorMsg = null;

		try {
			
			// 发送消息
			channel.basicPublish(exchange, routingKey,prop , bs);
			
		} catch (Exception e) {
	
			JSONObject  jsonObj = new JSONObject();
			jsonObj.put("exchange", exchange);
			jsonObj.put("routingKey", routingKey);
			jsonObj.put("bs", Base64.encodeBase64(bs));
			jsonObj.put("prop" , prop);
			FileUtils.writeLines(new File("sendErrorQueue.log") , Arrays.asList(jsonObj.toJSONString() + "\n") ,true );
			
			errorMsg = xiaoa.java.utils.ExceptionUtils.exceptionToString(e);
			
		}
		
		L.info(startTime , System.currentTimeMillis() ,"doSendMessage:[" + exchange + ":" + routingKey + "] " + (errorMsg != null ? errorMsg : "") + "length = " + bs.length );

	}
	
	
	/**
	 * 发送信息到错误队列
	 * @Title: doSendExceMssage
	 * @param delivery
	 * @author xiaoa
	 */
    public void doSendExceMssage(QueueingConsumer.Delivery delivery )throws Throwable{
    	
    	 byte[] body = delivery.getBody();
    	 
    	 BasicProperties properties =  delivery.getProperties();
    	 
    	 if (properties == null){
    		properties = new BasicProperties();
    	 }
    	 
    	 Map<String,Object>  headers =  delivery.getProperties().getHeaders();
    	 if (headers == null){
    		 headers = new HashMap<>();
    	 }
    	 
    	 headers.put("old_exchange", delivery.getEnvelope().getExchange());
    	 headers.put("old_routingKey", delivery.getEnvelope().getRoutingKey());
    	 headers.put("old_deliveryTag", delivery.getEnvelope().getDeliveryTag());

		 BasicProperties newProperties = new BasicProperties(properties.getContentType(),
				properties.getContentEncoding(), headers, properties.getDeliveryMode(), properties.getPriority(), null,
				properties.getReplyTo(), properties.getExpiration(), null, null, null, null, null, null);    	 
    	 
		Config  config =  Config.newInstance();
		 
		 doSendMessage(config.getErrorExchange(), config.getErrorRoutingKey(),  body , newProperties);
    	
    }
	
    
	

}
